Skip to content

Conversation

@beliefer
Copy link
Contributor

What changes were proposed in this pull request?

This is a ANSI SQL and feature id is F861

<query expression> ::=
[ <with clause> ] <query expression body>
[ <order by clause> ] [ <result offset clause> ] [ <fetch first clause> ]

<result offset clause> ::=
OFFSET <offset row count> { ROW | ROWS }

For example:

SELECT customer_name, customer_gender FROM customer_dimension 
   WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name;
    customer_name     | customer_gender
----------------------+-----------------
 Amy X. Lang          | Female
 Anna H. Li           | Female
 Brian O. Weaver      | Male
 Craig O. Pavlov      | Male
 Doug Z. Goldberg     | Male
 Harold S. Jones      | Male
 Jack E. Perkins      | Male
 Joseph W. Overstreet | Male
 Kevin . Campbell     | Male
 Raja Y. Wilson       | Male
 Samantha O. Brown    | Female
 Steve H. Gauthier    | Male
 William . Nielson    | Male
 William Z. Roy       | Male
(14 rows)

SELECT customer_name, customer_gender FROM customer_dimension 
   WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name OFFSET 8;
   customer_name   | customer_gender
-------------------+-----------------
 Kevin . Campbell  | Male
 Raja Y. Wilson    | Male
 Samantha O. Brown | Female
 Steve H. Gauthier | Male
 William . Nielson | Male
 William Z. Roy    | Male
(6 rows)

There are some mainstream database support the syntax.

Druid
https://druid.apache.org/docs/latest/querying/sql.html#offset

Kylin
http://kylin.apache.org/docs/tutorial/sql_reference.html#QUERYSYNTAX

Exasol
https://docs.exasol.com/sql/select.htm

Greenplum
http://docs.greenplum.org/6-8/ref_guide/sql_commands/SELECT.html

MySQL
https://dev.mysql.com/doc/refman/5.6/en/select.html

Monetdb
https://www.monetdb.org/Documentation/SQLreference/SQLSyntaxOverview#SELECT

PostgreSQL
https://www.postgresql.org/docs/11/queries-limit.html

Sqlite
https://www.sqlite.org/lang_select.html

Vertica
https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/Statements/SELECT/OFFSETClause.htm?zoom_highlight=offset

The description for design:
1. Consider OFFSET as the special case of LIMIT. For example:
SELECT * FROM a limit 10; similar to SELECT * FROM a limit 10 offset 0;
SELECT * FROM a offset 10; similar to SELECT * FROM a limit -1 offset 10;
2. Because the current implement of LIMIT has good performance. For example:
SELECT * FROM a limit 10; parsed to the logic plan as below:

GlobalLimit (limit = 10)
|--LocalLimit (limit = 10)

and then the physical plan as below:

GlobalLimitExec (limit = 10) // Take the first 10 rows globally
|--LocalLimitExec (limit = 10) // Take the first 10 rows locally

This operator reduce massive shuffle and has good performance.
Sometimes, the logic plan transformed to the physical plan as:

CollectLimitExec (limit = 10) // Take the first 10 rows globally

If the SQL contains order by, such as SELECT * FROM a order by c limit 10;.
This SQL will be transformed to the physical plan as below:

TakeOrderedAndProjectExec (limit = 10) // Take the first 10 rows after sort globally

Based on this situation, this PR produces the following operations. For example:
SELECT * FROM a limit 10 offset 10; parsed to the logic plan as below:

GlobalLimit (limit = 10)
|--LocalLimit (limit = 10)
   |--Offset (offset = 10)

After optimization, the above logic plan will be transformed to:

GlobalLimitAndOffset (limit = 10, offset = 10) // Limit clause accompanied by offset clause
|--LocalLimit (limit = 20)   // 10 + offset = 20

and then the physical plan as below:

GlobalLimitAndOffsetExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally
|--LocalLimitExec (limit = 20) // Take the first 20(limit + offset) rows locally

Sometimes, the logic plan transformed to the physical plan as:

CollectLimitExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally

If the SQL contains order by, such as SELECT * FROM a order by c limit 10 offset 10;.
This SQL will be transformed to the physical plan as below:

TakeOrderedAndProjectExec (limit = 10, offset 10) // Skip the first 10 rows and take the next 10 rows after sort globally

3.In addition to the above, there is a special case that is only offset but no limit. For example:
SELECT * FROM a offset 10; parsed to the logic plan as below:

Offset (offset = 10) // Only offset clause

If offset is very large, will generate a lot of overhead. So this PR will refuse use offset clause without limit clause, although we can parse, transform and execute it.

A balanced idea is add a configuration item spark.sql.forceUsingOffsetWithoutLimit to force running query when user knows the offset is small enough. The default value of spark.sql.forceUsingOffsetWithoutLimit is false. This PR just came up with the idea so that it could be implemented at a better time in the future.

Note: The origin PR to support this feature is #25416.
Because the origin PR too old, there exists massive conflict which is hard to resolve. So I open this new PR to support this feature.

Why are the changes needed?

new feature

Does this PR introduce any user-facing change?

'No'

How was this patch tested?

Exists and new UT

@beliefer
Copy link
Contributor Author

ping @cloud-fan

@dtenedor
Copy link
Contributor

@beliefer Hi Jiaan, I am Daniel. I work for Databricks and contribute to Spark (but am not a full committer like Wenchen). I can help review this PR as well. Please feel free to @mention me as needed.

Copy link
Contributor

@dtenedor dtenedor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you again for your effort in getting this implemented! It will be very useful for the Spark system.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove commented-out code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems I picked the code incorrectly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These queries remain commented-out, but it seems like they should be possible to test now. Can we uncomment them and enable as either positive or negative tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According the discussion offline between @cloud-fan and me. we not plan to support offset without limit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the parser it looks like we accept any expression for the OFFSET, but here we call asInstanceOf[Int]. Can we have an explicit check that this expression has integer type with an appropriate error message if not, and an accompanying test case that covers it?

Copy link
Contributor Author

@beliefer beliefer Apr 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, We check it with checkLimitLikeClause("offset", offsetExpr) on line 432.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG. Will the checkLimitLikeClause execute before this asInstanceOf[Int] call here? If so, we are OK. Otherwise, we would receive an exception here, which might result in an confusing error message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It's OK

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

limit -> the LIMIT clause
offset -> the OFFSET clause
Int.MaxValue -> the maximum 32-bit integer value (2,147,483,647)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> The OFFSET clause is only allowed...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line has a lot going on, would you mind breaking apart the logic into multiple lines with a comment describing the math?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also add test cases where:

  1. the OFFSET expression is not an integer value, e.g. "abc"
  2. the OFFSET expression is a long integer value
  3. the OFFSET expression is a constant but non-literal value, e.g. CASTing the current date to an integer, or some integer-valued UDF

Copy link
Contributor Author

@beliefer beliefer Apr 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 and 2 are OK.
3. current_date is foldable and UDF is not available in catalyst. In fact, postgreSQL/limit.sql already includes this test case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please indent +4 spaces for these args?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here?

@beliefer beliefer force-pushed the SPARK-28330 branch 2 times, most recently from 83f589a to 8a601f7 Compare April 12, 2022 08:04
Copy link
Contributor

@dtenedor dtenedor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again for all the help on this, the improved comments are helpful! This will be a useful feature for the system.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG. Will the checkLimitLikeClause execute before this asInstanceOf[Int] call here? If so, we are OK. Otherwise, we would receive an exception here, which might result in an confusing error message?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG, the logical Offset just removes the first N rows. When we combine it with a Limit in the physical plan, then we can think about these semantics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG.


override def doExecute(): RDD[InternalRow] = {
val rdd = child.execute().mapPartitions { iter => iter.take(limit + offset)}
rdd.zipWithIndex().filter(_._2 >= offset).map(_._1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we accomplish this in a simpler way by just doing a drop by offset on the rows instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, we can avoid shuffle by the way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look at RDD.zipWithIndex, it doesn't do shuffle but it submits an extra job to get the number of records in each partition, which means it executes the compute task twice.

I think shuffle is safer here. Let's just follow what GlobalLimitExec does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

IntegerLiteral(limit),
IntegerLiteral(offset),
Sort(order, true, child))
if limit < conf.topKSortFallbackThreshold =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be limit + offset < conf.topKSortFallbackThreshold? Same below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah.

-- !query
SELECT '' AS zero, unique1, unique2, stringu1
FROM onek WHERE unique1 < 50
ORDER BY unique1 DESC LIMIT 8 OFFSET 99
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we add a few test cases with the LIMIT and OFFSET inside subqueries? Do the rows get filtered out at the table subquery boundary and then the rows from the OFFSET are not consumed by the remaining logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added into exists-orderby-limit.sql and in-limit.sql

-- !query
SELECT '' AS five, unique1, unique2, stringu1
FROM onek
ORDER BY unique1 LIMIT 5 OFFSET 900
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have a test case with OFFSET 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added into limit.sql

-- !query
SELECT '' AS eleven, unique1, unique2, stringu1
FROM onek WHERE unique1 < 50
ORDER BY unique1 DESC LIMIT 20 OFFSET 39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have a test case with a LIMIT + OFFSET following each of the major operators, e.g. aggregation, join, union all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added into in-limit.sql

@sigmod
Copy link
Contributor

sigmod commented Apr 13, 2022

cc @jchen5

@dtenedor
Copy link
Contributor

Note: this test failure seems like a general breakage in the GitHub REST infrastructure for Spark at the moment, and not anything related to your PR:

image

@jchen5
Copy link
Contributor

jchen5 commented Apr 14, 2022

Thanks for implementing this!

For the OFFSET with not LIMIT case, you say:

If offset is very large, will generate a lot of overhead. So this PR will refuse use offset clause without limit clause, although we can parse, transform and execute it.

Would it have worse performance compared to the same query without OFFSET? In theory OFFSET should be no worse than a full no-LIMIT no-OFFSET query because we can just skip the first n rows? However, I agree that this doesn't seem like an important case and it's ok not to support it for now.


/**
* Skip the first `offset` elements then take the first `limit` of the following elements in
* the child's single output partition.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can there be multiple output partitions in the case that the child was not sorted?

In the case it wasn't sorted, we can return an arbitrary collection of rows of the correct size, so it doesn't really matter, just trying to understand the invariants.

Copy link
Contributor

@jchen5 jchen5 Apr 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like if the child has multiple partitions, zipWithIndex will index starting with all the rows in the first partition, then the next partition etc. I believe this is fine because in a case with multiple partitions, that means the child data isn't sorted. Could you add a comment about this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implement only respect the partition index. If the child was not sorted, the output looks disordered.

@dtenedor
Copy link
Contributor

Hi Jiaan, I just wanted to mention that I will away from work on vacation for two more additional days :) apologies for the delay. I will resume the review the following Monday. Thanks again for all your hard work on this PR, we appreciate it.


case Tail(limitExpr, _) => checkLimitLikeClause("tail", limitExpr)

case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this check to checkOutermostOffset and rename it to checkOffsetOperator? It's better to have a central place to check offset position.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

}

/**
* Rewrite [[Offset]] as [[Limit]] or combines two adjacent [[Offset]] operators into one,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

combines two adjacent [[Offset]] operators into one

where does it happen?


errorTest(
"num_rows in offset clause must be equal to or greater than 0",
listRelation.offset(-1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
listRelation.offset(-1),
testRelation.offset(-1),

CollectLimitExec(limit, planLater(child)) :: Nil
CollectLimitExec(limit, 0, planLater(child)) :: Nil
case GlobalLimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset),
Sort(order, true, child)) if limit < conf.topKSortFallbackThreshold =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Sort(order, true, child)) if limit < conf.topKSortFallbackThreshold =>
Sort(order, true, child)) if limit + offset < conf.topKSortFallbackThreshold =>

TakeOrderedAndProjectExec(limit, offset, order, child.output, planLater(child)) :: Nil
case GlobalLimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset),
Project(projectList, Sort(order, true, child)))
if limit < conf.topKSortFallbackThreshold =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

// collect the first `limit` + `offset` elements and then to drop the first `offset` elements.
// For example: limit is 1 and offset is 2 and the child output two partition.
// The first partition output [1, 2, 3] and the Second partition output [4, 5].
// Then [1, 2, 3] or [4, 5, 1] will be taken and output [3] or [1].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[1, 2, 3] or [4, 5, 1] are you sure? AFAIK RDD.take will collect the results w.r.t. the partition order, so it should always be [1, 2, 3]

override def executeCollect(): Array[InternalRow] = {
val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
val data = child.execute().map(_.copy()).takeOrdered(limit)(ord)
val data = child.execute().map(_.copy()).takeOrdered(limit + offset)(ord).drop(offset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, please avoid calling .drop when not necessary

(48) TakeOrderedAndProject
Input [14]: [cd_gender#27, cd_marital_status#28, cd_education_status#29, cnt1#39, cd_purchase_estimate#30, cnt2#40, cd_credit_rating#31, cnt3#41, cd_dep_count#32, cnt4#42, cd_dep_employed_count#33, cnt5#43, cd_dep_college_count#34, cnt6#44]
Arguments: 100, [cd_gender#27 ASC NULLS FIRST, cd_marital_status#28 ASC NULLS FIRST, cd_education_status#29 ASC NULLS FIRST, cd_purchase_estimate#30 ASC NULLS FIRST, cd_credit_rating#31 ASC NULLS FIRST, cd_dep_count#32 ASC NULLS FIRST, cd_dep_employed_count#33 ASC NULLS FIRST, cd_dep_college_count#34 ASC NULLS FIRST], [cd_gender#27, cd_marital_status#28, cd_education_status#29, cnt1#39, cd_purchase_estimate#30, cnt2#40, cd_credit_rating#31, cnt3#41, cd_dep_count#32, cnt4#42, cd_dep_employed_count#33, cnt5#43, cd_dep_college_count#34, cnt6#44]
Arguments: 100, 0, [cd_gender#27 ASC NULLS FIRST, cd_marital_status#28 ASC NULLS FIRST, cd_education_status#29 ASC NULLS FIRST, cd_purchase_estimate#30 ASC NULLS FIRST, cd_credit_rating#31 ASC NULLS FIRST, cd_dep_count#32 ASC NULLS FIRST, cd_dep_employed_count#33 ASC NULLS FIRST, cd_dep_college_count#34 ASC NULLS FIRST], [cd_gender#27, cd_marital_status#28, cd_education_status#29, cnt1#39, cd_purchase_estimate#30, cnt2#40, cd_credit_rating#31, cnt3#41, cd_dep_count#32, cnt4#42, cd_dep_employed_count#33, cnt5#43, cd_dep_college_count#34, cnt6#44]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we only print the new argument if it's not 0?

SELECT * FROM testdata LIMIT 2;
SELECT * FROM arraydata LIMIT 2;
SELECT * FROM mapdata LIMIT 2;
SELECT * FROM mapdata LIMIT 2 OFFSET 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why testing OFFSET 0 not 2? why only test mapdata?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we think the pgsql tests are sufficient for OFFSET, we don't need to touch this file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants